Dask

Dask is a Python library for doing computataions with big data. This notebook will let you try some of the basics.

Getting set up

First off, let's import some Python libraries and install some dependencies


In [ ]:
!pip install castra graphviz # missing dependency for 
!apt-get install -y graphviz # logic graph viz

# just so we do plots in the notebook
%matplotlib inline
import dask # for parallel computing
from distributed import Executor, progress # for distributed parallel computing

The first thing to do is grab the Dask executor - the thing we'll use for submitting jobs to our cluster.


In [ ]:
e = Executor('dask.informaticslab.co.uk:8786')
print(e)

You should see now how many nodes are available in the executor. Ask Niall to show you how we scale this up and down using AWS.

Execute a function on the nodes

We're going to write a simple function which we can exectue on the nodes. The code in the cell below just gets the IP of the computer its run on


In [ ]:
import socket
getIP = lambda: "Hello world from IP " + socket.gethostbyname(socket.gethostname())

getIP()

The cell above was just run on the same computer that this Notebook is running on. Let's submit it to the compute cluster using our Executor e.

You should open this window and click on Status where you can watch all the jobs (yours and others) being submitted to the comput node.


In [ ]:
e.run(getIP)

Delayed functions

Have a look at the cell below - it does various basic arithemitc opertions.


In [ ]:
def inc(x):
    return x + 1

def double(x):
    return x * 2

def add(x, y):
    return x + y

def process_list(xs):
    output = []
    for x in data:
        a = inc(x)
        b = double(x)
        c = add(a, b)
        output.append(c)
    return output

def mysum(xs):
    total = 0
    for x in xs:
        total += x

    return total

In [ ]:
double(2)

In [ ]:
inc(double(2))

In [ ]:
data = [1, 2, 3, 4, 5]

result = mysum(process_list(data))

In [ ]:
print(result)

So far so normal. No for the magic of the wonderful dask.delayed decorator.

If we decorate the same functions, they become delayed functions. This means the are not yet executed, but their sitting waiting to be mapped onto our Exector e.


In [ ]:
@dask.delayed
def inc(x):
    return x + 1

@dask.delayed
def double(x):
    return x * 2

@dask.delayed
def add(x, y):
    return x + y

def process_list(xs):
    output = []
    for x in data:
        a = inc(x)
        b = double(x)
        c = add(a, b)
        output.append(c)
    return dask.delayed(output)

@dask.delayed
def mysum(xs):
    total = 0
    for x in xs:
        total += x

    return total

In [ ]:
result = double(2)

In [ ]:
print(result)

In [ ]:
print(result.compute())

In [ ]:
data = [1, 2, 3, 4, 5]

result = mysum(dask.delayed(process_list(data)))

In [ ]:
result

In [ ]:
result.visualize()

In [ ]:
result.compute()

On more data, the logic graph gets automatically optimised


In [ ]:
data = range(300)

result = mysum(dask.delayed(process_list(data)))
result.visualize() #be patient!